﻿using Devonline.Core;
using DotPulsar;
using DotPulsar.Abstractions;
using DotPulsar.Extensions;

namespace Devonline.MessageQueue.Pulsar;

/// <summary>
/// Pulsar 消息对了服务的直接接口
/// </summary>
public interface IPulsarService : IMessageQueueService;

/// <summary>
/// 使用 apache pulsar 实现的消息队列客户端
/// </summary>
/// <param name="logger">日志</param>
/// <param name="endpoint">消息队列设置</param>
public sealed class PulsarService(ILogger<PulsarService> logger, IPulsarEndpoint endpoint) : IPulsarService, IMessageQueueService
{
    private readonly ILogger<PulsarService> _logger = logger;
    private readonly IPulsarEndpoint _endpoint = endpoint;
    private readonly Dictionary<string, IProducer> _producers = [];
    private readonly Dictionary<string, IConsumer> _consumers = [];
    private IPulsarClient? _client;
    public const string METHOD_NEW_CONSUMER = "NewConsumer";
    public const string METHOD_NEW_PRODUCER = "NewProducer";

    /// <summary>
    /// 启动消息队列客户端
    /// </summary>
    public async Task StartAsync()
    {
        if (_client is null)
        {
            _logger.LogDebug("将创建并启动消息队列客户端 {clientName}", _endpoint.Name);
            ArgumentException.ThrowIfNullOrWhiteSpace(_endpoint.Host);
            _client = PulsarClient.Builder().ServiceUrl(new Uri(_endpoint.Host)).Build();
            _logger.LogInformation("已创建并启动消息队列客户端 {clientName}", _endpoint.Name);
        }

        await Task.CompletedTask;
    }
    /// <summary>
    /// 停止所有生产者和消费者以及客户端对象
    /// </summary>
    /// <returns></returns>
    public async Task StopAsync()
    {
        if (_client is not null)
        {
            _logger.LogDebug("将释放全部生产者");
            foreach (var producer in _producers.Values)
            {
                await producer.DisposeAsync();
            }

            _producers.Clear();
            _logger.LogInformation("已释放全部生产者");

            _logger.LogDebug("将取消订阅并释放全部消费者");
            foreach (var consumer in _consumers.Values)
            {
                await consumer.Unsubscribe();
                await consumer.DisposeAsync();
            }

            _consumers.Clear();
            _logger.LogInformation("已取消订阅并释放全部消费者");
            await _client.DisposeAsync();
            _logger.LogInformation("已释放消息队列客户端");
        }
    }
    /// <summary>
    /// 停止一个 topic 对应的生产者或者消费者
    /// </summary>
    /// <param name="topic"></param>
    /// <returns></returns>
    public async Task StopAsync(string topic)
    {
        var producer = _producers.GetValue(topic);
        if (producer is not null)
        {
            _logger.LogDebug("将释放当前主题: {topic} 的生产者", topic);
            await producer.DisposeAsync();
            _producers.Remove(topic);
            _logger.LogInformation("已释放当前主题: {topic} 的生产者", topic);
        }

        var consumer = _consumers.GetValue(topic);
        if (consumer is not null)
        {
            _logger.LogDebug("将取消订阅并释放当前主题: {topic} 的消费者", topic);
            await consumer.Unsubscribe();
            await consumer.DisposeAsync();
            _consumers.Remove(topic);
            _logger.LogInformation("已取消订阅并释放当前主题: {topic} 的消费者", topic);
        }
    }

    /// <summary>
    /// 发布消息到主题
    /// </summary>
    /// <typeparam name="TMessage">消息类型</typeparam>
    /// <param name="topic">发布主题</param>
    /// <param name="message">消息内容</param>
    /// <returns></returns>
    public async Task PublishAsync<TMessage>(string topic, TMessage message)
    {
        var producer = _producers.GetValue(topic);
        if (producer is null && _client is not null)
        {
            _logger.LogDebug("将创建主题 {topic} 的生产者", topic);
            var builder = typeof(PulsarClientExtensions).InvokeGenericMethod<IProducerBuilder<TMessage>>(METHOD_NEW_PRODUCER, _client, new object[] { GetSchema<TMessage>() }, typeof(TMessage));
            if (builder is not null)
            {
                producer = builder.Topic(topic).Create();
                _producers.SetValue(topic, producer);
                _logger.LogInformation("已创建主题: {topic} 的生产者", topic);
            }
        }

        if (producer is not null && producer is ISend<TMessage> sendProducer)
        {
            var messageId = await sendProducer.Send(message);
            _logger.LogInformation("已向主题: {topic} 发送消息: " + messageId, topic);
        }
    }
    /// <summary>
    /// 从主题订阅消息
    /// </summary>
    /// <typeparam name="TMessage">消息类型</typeparam>
    /// <param name="topic">订阅主题</param>
    /// <param name="subscriptionName">订阅名称</param>
    /// <param name="onSubscribe">接收订阅的方法委托</param>
    /// <param name="subscriptionType">订阅类型</param>
    /// <returns></returns>
    public async Task SubscribeAsync<TMessage>(string topic, string subscriptionName, Func<TMessage, Task> onSubscribe, string? subscriptionType = default) where TMessage : new()
    {
        var type = GetSubscriptionType(subscriptionType);
        var consumer = _consumers.GetValue(topic);
        if (consumer is null && _client is not null)
        {
            _logger.LogDebug("将创建主题: {topic} 的消费者", topic);
            var builder = typeof(PulsarClientExtensions).InvokeGenericMethod<IConsumerBuilder<TMessage>>(METHOD_NEW_CONSUMER, _client, new object[] { GetSchema<TMessage>() }, typeof(TMessage));
            if (builder is not null)
            {
                consumer = builder.Topic(topic).SubscriptionName(subscriptionName).SubscriptionType(type).Create();
                _logger.LogInformation("已创建主题: {topic} 的消费者", topic);

                _consumers.SetValue(topic, consumer);
            }
        }

        if (consumer is not null && consumer is IConsumer<TMessage> receiveConsumer)
        {
            new Task(async () => await receiveConsumer.Process<TMessage>(async (message, cancellationToken) =>
            {
                _logger.LogDebug("已收到主题: {topic} 的消息, 将调用订阅方法, 消息: " + message.MessageId, topic);
                await onSubscribe(message.Value());
                _logger.LogInformation("已收到主题: {topic} 的消息, 已调用订阅方法, 消息: " + message.MessageId, topic);
            }, GetProcessingOptions(type))).Start();
        }

        await Task.CompletedTask;
    }
    /// <summary>
    /// 订阅文本消息
    /// </summary>
    /// <param name="topic"></param>
    /// <param name="subscriptionName"></param>
    /// <param name="onSubscribe"></param>
    /// <param name="subscriptionType"></param>
    /// <returns></returns>
    public async Task SubscribeAsync(string topic, string subscriptionName, Func<string, Task> onSubscribe, string? subscriptionType = null)
    {
        var type = GetSubscriptionType(subscriptionType);
        var consumer = _consumers.GetValue(topic);
        if (consumer is null && _client is not null)
        {
            _logger.LogDebug("将创建主题: {topic} 的消费者", topic);
            var builder = typeof(PulsarClientExtensions).InvokeGenericMethod<IConsumerBuilder<string>>(METHOD_NEW_CONSUMER, _client, new object[] { Schema.String }, typeof(string));
            if (builder is not null)
            {
                consumer = builder.Topic(topic).SubscriptionName(subscriptionName).SubscriptionType(type).Create();
                _logger.LogInformation("已创建主题: {topic} 的消费者", topic);

                _consumers.SetValue(topic, consumer);
            }
        }

        if (consumer is not null && consumer is IConsumer<string> receiveConsumer)
        {
            new Task(async () => await receiveConsumer.Process<string>(async (message, cancellationToken) =>
            {
                _logger.LogDebug("已收到主题: {topic} 的消息, 将调用订阅方法, 消息: " + message.MessageId, topic);
                await onSubscribe(message.Value());
                _logger.LogInformation("已收到主题: {topic} 的消息, 已调用订阅方法, 消息: " + message.MessageId, topic);
            }, GetProcessingOptions(type))).Start();
        }

        await Task.CompletedTask;
    }

    /// <summary>
    /// 根据输入类型获取订阅类型
    /// </summary>
    /// <param name="subscriptionType">输入的订阅类型</param>
    /// <returns></returns>
    private SubscriptionType GetSubscriptionType(string? subscriptionType = default) => string.IsNullOrWhiteSpace(subscriptionType) ? SubscriptionType.Shared : Enum.Parse<SubscriptionType>(subscriptionType);
    /// <summary>
    /// 根据类型获取 Schema, 目前仅支持 string 和 byte[]
    /// </summary>
    /// <typeparam name="TType"></typeparam>
    /// <returns></returns>
    private object GetSchema<TType>()
    {
        var type = typeof(TType);
        if (type == typeof(string))
        {
            return Schema.String;
        }
        else
        {
            return Schema.ByteArray;
        }
    }
    /// <summary>
    /// 获取执行选项
    /// </summary>
    /// <param name="subscriptionType"></param>
    /// <returns></returns>
    private ProcessingOptions GetProcessingOptions(SubscriptionType subscriptionType)
    {
        return new ProcessingOptions
        {
            EnsureOrderedAcknowledgment = subscriptionType != SubscriptionType.Shared && subscriptionType != SubscriptionType.KeyShared,
        };
    }
}